/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.metamodel.cassandra;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.SimpleDataSetHeader;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.MutableColumn;
import org.apache.metamodel.schema.MutableSchema;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection;

/**
 * DataContext implementation for Apache Cassandra database.
 *
 * When instantiating this DataContext, a keyspace name is provided. In
 * Cassandra, the keyspace is the container for your application data, similar
 * to a schema in a relational database. Keyspaces are used to group column
 * families together.
 * 
 * This implementation supports either automatic discovery of a schema or manual
 * specification of a schema, through the {@link SimpleTableDef} class.
 *
 */
public class CassandraDataContext extends QueryPostprocessDataContext implements DataContext {

    private static final Logger logger = LoggerFactory.getLogger(CassandraDataContext.class);
    private final Cluster cassandraCluster;
    private final SimpleTableDef[] tableDefs;
    private final String keySpaceName;

    /**
     * Constructs a {@link CassandraDataContext}. This constructor accepts a
     * custom array of {@link SimpleTableDef}s which allows the user to define
     * his own view on the indexes in the engine.
     *
     * @param cluster
     *            the Cassandra cluster
     * @param keySpace
     *            the name of the Cassandra keyspace
     * @param tableDefs
     *            an array of {@link SimpleTableDef}s, which define the table
     *            and column model of the ElasticSearch index.
     */
    public CassandraDataContext(Cluster cluster, String keySpace, SimpleTableDef... tableDefs) {
        super(false);
        this.cassandraCluster = cluster;
        this.keySpaceName = keySpace;
        this.tableDefs = tableDefs;
    }

    /**
     * Constructs a {@link CassandraDataContext} and automatically detects the
     * schema structure/view on the keyspace (see
     * {@link #detectSchema(Cluster, String)}).
     *
     * @param cluster
     *            the Cassandra cluster
     * @param keySpace
     *            the name of the Cassandra keyspace to represent
     */
    public CassandraDataContext(Cluster cluster, String keySpace) {
        this(cluster, keySpace, detectSchema(cluster, keySpace));
    }

    /**
     * Performs an analysis of the given keyspace in a Cassandra cluster
     * {@link Cluster} instance and detects the cassandra types structure based
     * on the metadata provided by the datastax cassandra java client.
     *
     * @see #detectTable(TableMetadata)
     *
     * @param cluster
     *            the cluster to inspect
     * @param keyspaceName
     * @return a mutable schema instance, useful for further fine tuning by the
     *         user.
     */
    public static SimpleTableDef[] detectSchema(Cluster cluster, String keyspaceName) {
        final Metadata metadata = cluster.getMetadata();
        final KeyspaceMetadata keyspace = metadata.getKeyspace(keyspaceName);
        if (keyspace == null) {
            throw new IllegalArgumentException("Keyspace '" + keyspaceName + "' does not exist in the database");
        }
        final Collection<TableMetadata> tables = keyspace.getTables();
        final SimpleTableDef[] result = new SimpleTableDef[tables.size()];
        int i = 0;
        for (final TableMetadata tableMetaData : tables) {
            final SimpleTableDef table = detectTable(tableMetaData);
            result[i] = table;
            i++;
        }
        return result;
    }

    /**
     * Performs an analysis of an available table in Cassandra.
     *
     * @param tableMetaData
     *            the table meta data
     * @return a table definition for cassandra.
     */
    public static SimpleTableDef detectTable(TableMetadata tableMetaData) {
        final List<ColumnMetadata> columns = tableMetaData.getColumns();
        final String[] columnNames = new String[columns.size()];
        final ColumnType[] columnTypes = new ColumnType[columns.size()];
        int i = 0;
        for (final ColumnMetadata column : columns) {
            columnNames[i] = column.getName();
            columnTypes[i] = getColumnTypeFromMetaDataField(column.getType().getName());
            i++;
        }

        return new SimpleTableDef(tableMetaData.getName(), columnNames, columnTypes);
    }

    @Override
    protected Schema getMainSchema() throws MetaModelException {
        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
        for (final SimpleTableDef tableDef : tableDefs) {
            final MutableTable table = tableDef.toTable().setSchema(theSchema);

            final TableMetadata cassandraTable = cassandraCluster.getMetadata().getKeyspace(keySpaceName).getTable(table
                    .getName());
            if (cassandraTable != null) {
                final List<ColumnMetadata> primaryKeys = cassandraTable.getPrimaryKey();
                for (ColumnMetadata primaryKey : primaryKeys) {
                    final MutableColumn column = (MutableColumn) table.getColumnByName(primaryKey.getName());
                    if (column != null) {
                        column.setPrimaryKey(true);
                    }
                    column.setNativeType(primaryKey.getType().getName().name());
                }
            }

            theSchema.addTable(table);
        }
        return theSchema;
    }

    @Override
    protected String getMainSchemaName() throws MetaModelException {
        return keySpaceName;
    }

    @Override
    protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
        final Select query = QueryBuilder.select().all().from(keySpaceName, table.getName());
        if (limitMaxRowsIsSet(maxRows)) {
            query.limit(maxRows);
        }
        final ResultSet resultSet = cassandraCluster.connect().execute(query);

        final Iterator<Row> response = resultSet.iterator();
        return new CassandraDataSet(response, columns);
    }

    private boolean limitMaxRowsIsSet(int maxRows) {
        return (maxRows != -1);
    }

    @Override
    protected org.apache.metamodel.data.Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems,
            Column primaryKeyColumn, Object keyValue) {
        
        if (primaryKeyColumn.getType() == ColumnType.UUID && keyValue instanceof String) {
            keyValue = UUID.fromString(keyValue.toString());
        }

        Selection select = QueryBuilder.select();
        for (SelectItem selectItem : selectItems) {
            final Column column = selectItem.getColumn();
            assert column != null;
            select = select.column(column.getName());
        }

        final Statement statement = select.from(keySpaceName, table.getName()).where(QueryBuilder.eq(primaryKeyColumn
                .getName(), keyValue));

        final Row row = cassandraCluster.connect().execute(statement).one();

        return CassandraUtils.toRow(row, new SimpleDataSetHeader(selectItems));
    }

    @Override
    protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
        if (!whereItems.isEmpty()) {
            // not supported - will have to be done by counting client-side
            logger.debug(
                    "Not able to execute count query natively - resorting to query post-processing, which may be expensive");
            return null;
        }
        final Statement statement = QueryBuilder.select().countAll().from(keySpaceName, table.getName());
        final Row response = cassandraCluster.connect().execute(statement).one();
        return response.getLong(0);
    }

    private static ColumnType getColumnTypeFromMetaDataField(DataType.Name metaDataName) {
        switch (metaDataName) {
        case BIGINT:
        case COUNTER:
            return ColumnType.BIGINT;
        case BLOB:
            return ColumnType.BLOB;
        case BOOLEAN:
            return ColumnType.BOOLEAN;
        case DECIMAL:
            return ColumnType.DECIMAL;
        case DOUBLE:
            return ColumnType.DOUBLE;
        case FLOAT:
            return ColumnType.FLOAT;
        case INT:
            return ColumnType.INTEGER;
        case TEXT:
            return ColumnType.STRING;
        case TIMESTAMP:
            return ColumnType.TIMESTAMP;
        case UUID:
            return ColumnType.UUID;
        case VARCHAR:
            return ColumnType.VARCHAR;
        case VARINT:
            return ColumnType.BIGINT;
        case LIST:
            return ColumnType.LIST;
        case MAP:
            return ColumnType.MAP;
        case CUSTOM:
            return ColumnType.OTHER;
        case INET:
            return ColumnType.INET;
        case SET:
            return ColumnType.SET;
        default:
            return ColumnType.STRING;
        }
    }
}
